昨天有簡單敘述一下 Autoencoder 的背後理論。
Autoencoder 是一種神經網絡架構,主要用於無監督學習和特徵提取。
它的主要目標是將輸入數據編碼成一種低維度的表示形式,然後再解碼回原始輸入數據。
這個過程有助於捕捉數據中的關鍵特徵,同時減少冗餘信息。
今天針對 Variational Autoencoder 來實做。
相較一般 Autoencoder,Variational Autoencoder 不僅學習壓縮表示,還學習了潛在空間的統計分佈。
這裡使用的是 MNIST Dataset,一個手寫數字的資料集,裡面的每張圖片都是一個手寫數字圖片,並且標示對應的數字。
圖片的大小是,顏色格式是灰階。
import numpy as npy
from PIL import Image
try:
import cupy as np
except ImportError:
import numpy as np
from vae.utils.functionals import initialise_weight, initialise_bias
from vae.utils.functionals import relu, sigmoid
eps = 10e-8
class Encoder(object):
def __init__(self, input_channels, layer_size, nz,
batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
"""
"""
self.input_channels = input_channels
self.nz = nz
self.batch_size = batch_size
self.layer_size = layer_size
# Initialise encoder weight
self.W0 = initialise_weight(self.input_channels, self.layer_size)
self.b0 = initialise_bias(self.layer_size)
self.W_mu = initialise_weight(self.layer_size, self.nz)
self.b_mu = initialise_bias(self.nz)
self.W_logvar = initialise_weight(self.layer_size, self.nz)
self.b_logvar = initialise_bias(self.nz)
# Adam optimiser momentum and velocity
self.lr = lr
self.momentum = [0.0] * 6
self.velocity = [0.0] * 6
self.beta1 = beta1
self.beta2 = beta2
self.t = 0
def forward(self, x):
"""
"""
self.e_input = x.reshape((self.batch_size, -1))
# Dimension check on input
assert self.e_input.shape == (self.batch_size, self.input_channels)
self.h0_l = self.e_input.dot(self.W0) + self.b0
self.h0_a = relu(self.h0_l)
self.logvar = self.h0_a.dot(self.W_logvar) + self.b_logvar
self.mu = self.h0_a.dot(self.W_mu) + self.b_mu
self.rand_sample = np.random.standard_normal(size=(self.batch_size, self.nz))
self.sample_z = self.mu + np.exp(self.logvar * .5) * self.rand_sample
return self.sample_z, self.mu, self.logvar
def optimise(self, grads):
"""
"""
# ---------------------------
# Optimise using Adam
# ---------------------------
self.t += 1
# Calculate gradient with momentum and velocity
for i, grad in enumerate(grads):
self.momentum[i] = self.beta1 * self.momentum[i] + (1 - self.beta1) * grad
self.velocity[i] = self.beta2 * self.velocity[i] + (1 - self.beta2) * np.power(grad, 2)
m_h = self.momentum[i] / (1 - (self.beta1 ** self.t))
v_h = self.velocity[i] / (1 - (self.beta2 ** self.t))
grads[i] = m_h / np.sqrt(v_h + eps)
grad_W0, grad_b0, grad_W_mu, grad_b_mu, grad_W_logvar, grad_b_logvar = grads
# Update weights
self.W0 = self.W0 - self.lr * np.sum(grad_W0, axis=0)
self.b0 = self.b0 - self.lr * np.sum(grad_b0, axis=0)
self.W_mu = self.W_mu - self.lr * np.sum(grad_W_mu, axis=0)
self.b_mu = self.b_mu - self.lr * np.sum(grad_b_mu, axis=0)
self.W_logvar = self.W_logvar - self.lr * np.sum(grad_W_logvar, axis=0)
self.b_logvar = self.b_logvar - self.lr * np.sum(grad_b_logvar, axis=0)
return
def backward(self, x, grad_dec):
"""
"""
# ----------------------------------------
# Calculate gradients from reconstruction
# ----------------------------------------
y = np.reshape(x, (self.batch_size, -1))
db_mu = grad_dec
dW_mu = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(grad_dec, axis=1))
db_logvar = grad_dec * np.exp(self.logvar * .5) * .5 * self.rand_sample
dW_logvar = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(db_logvar, axis=1))
drelu = relu(self.h0_l, derivative=True)
db0 = drelu * (db_mu.dot(self.W_mu.T) + db_logvar.dot(self.W_logvar.T))
dW0 = np.matmul(np.expand_dims(y, axis=-1), np.expand_dims(db0, axis=1))
# ----------------------------------------
# Calculate gradients from K-L
# ----------------------------------------
# logvar terms
dKL_b_logvar = .5 * (np.exp(self.logvar) - 1)
dKL_W_logvar = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dKL_b_logvar, axis=1))
# mu terms
dKL_b_mu = .5 * 2 * self.mu
dKL_W_mu = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dKL_b_mu, axis=1))
dKL_b0 = drelu * (dKL_b_logvar.dot(self.W_logvar.T) + dKL_b_mu.dot(self.W_mu.T))
dKL_W0 = np.matmul(np.expand_dims(y, axis=-1), np.expand_dims(dKL_b0, axis=1))
# Combine gradients for encoder from recon and KL
grad_b_logvar = dKL_b_logvar + db_logvar
grad_W_logvar = dKL_W_logvar + dW_logvar
grad_b_mu = dKL_b_mu + db_mu
grad_W_mu = dKL_W_mu + dW_mu
grad_b0 = dKL_b0 + db0
grad_W0 = dKL_W0 + dW0
grads = [grad_W0, grad_b0, grad_W_mu, grad_b_mu, grad_W_logvar, grad_b_logvar]
# Optimise step
self.optimise(grads)
return
cupy
是一個能夠在 GPU 上執行的 numpy 替代品,用於加速計算
__init__(self, input_channels, layer_size, nz, batch_size, lr, beta1, beta2)
初始化了編碼器的各種參數,包括
forward(self, x)
這裡實做前向傳播過程。
它接受輸入 x
,對其進行編碼並計算出潛在變數(latent variables) sample_z
、均值(mu)和對數變異數(logvar)。
optimise(self, grads)
優化器的優化步驟,使用Adam優化算法來更新權重和偏差。
backward(self, x, grad_dec)
實現了反向傳播過程,計算了從重建損失(reconstruction loss)和KL散度(Kullback-Leibler divergence)中獲得的梯度,然後使用 optimise
方法來優化編碼器的權重和偏差。
import numpy as npy
from PIL import Image
try:
import cupy as np
except ImportError:
import numpy as np
from vae.utils.functionals import initialise_weight, initialise_bias
from vae.utils.functionals import MSELoss
from vae.utils.functionals import relu, sigmoid
eps = 10e-8
class Decoder(object):
def __init__(self, input_channels, layer_size, nz,
batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
self.input_channels = input_channels
self.nz = nz
self.batch_size = batch_size
self.layer_size = layer_size
# Initialise decoder weight
self.W0 = initialise_weight(self.nz, self.layer_size)
self.b0 = initialise_bias(self.layer_size)
self.W1 = initialise_weight(self.layer_size, self.input_channels)
self.b1 = initialise_bias(self.input_channels)
# Adam optimiser momentum and velocity
self.lr = lr
self.momentum = [0.0] * 4
self.velocity = [0.0] * 4
self.beta1 = beta1
self.beta2 = beta2
self.t = 0
def forward(self, z):
self.z = np.reshape(z, (self.batch_size, self.nz))
self.h0_l = self.z.dot(self.W0) + self.b0
self.h0_a = relu(self.h0_l)
self.h1_l = self.h0_l.dot(self.W1) + self.b1
self.h1_a = sigmoid(self.h1_l)
self.d_out = np.reshape(self.h1_a, (self.batch_size, self.input_channels))
return self.d_out
def optimise(self, grads):
"""
"""
# ---------------------------
# Optimise using Adam
# ---------------------------
self.t += 1
# Calculate gradient with momentum and velocity
for i, grad in enumerate(grads):
self.momentum[i] = (1 - self.beta1) * grad
self.velocity[i] = self.beta2 * self.velocity[i] + (1 - self.beta2) * np.power(grad, 2)
m_h = self.momentum[i] / (1 - (self.beta1 ** self.t))
v_h = self.velocity[i] / (1 - (self.beta2 ** self.t))
grads[i] = m_h / np.sqrt(v_h + eps)
grad_dW0, grad_db0, grad_dW1, grad_db1 = grads
# Update weights
self.W0 = self.W0 - self.lr * np.sum(grad_dW0, axis=0)
self.b0 = self.b0 - self.lr * np.sum(grad_db0, axis=0)
self.W1 = self.W1 - self.lr * np.sum(grad_dW1, axis=0)
self.b1 = self.b1 - self.lr * np.sum(grad_db1, axis=0)
return
def backward(self, x, out):
# ----------------------------------------
# Calculate gradients from reconstruction
# ----------------------------------------
y = np.reshape(x, (self.batch_size, -1))
out = np.reshape(out, (self.batch_size, -1))
dL = MSELoss(out, y, derivative=True)
dSig = sigmoid(self.h1_l, derivative=True)
dL_dSig = dL * dSig
grad_db1 = dL_dSig
grad_dW1 = np.matmul(np.expand_dims(self.h0_a, axis=-1), np.expand_dims(dL_dSig, axis=1))
drelu0 = relu(self.h0_l, derivative=True)
grad_db0 = grad_db1.dot(self.W1.T) * drelu0
grad_dW0 = np.matmul(np.expand_dims(self.z, axis=-1), np.expand_dims(grad_db0, axis=1))
# output gradient to the encoder layer
grad_dec = grad_db0.dot(self.W0.T)
grads = [grad_dW0, grad_db0, grad_dW1, grad_db1]
# Optimiser Step
self.optimise(grads)
return grad_dec
這裡實現了一個解碼器,用於將潛在變數轉換為生成的數據點,並使用Adam優化器來學習權重。
解碼器有兩個隱藏層,分別使用ReLU和Sigmoid激活函數。
在反向傳播中,計算了梯度,然後使用Adam優化器來更新權重。
import numpy as npy
from PIL import Image
try:
import cupy as np
except ImportError:
import numpy as np
from .encoder_model import Encoder
from .decoder_model import Decoder
eps = 10e-8
class VariationalAutoEncoder(object):
def __init__(self, input_channels, layer_size, nz,
batch_size=64, lr=1e-3, beta1=0.9, beta2=0.999):
self.input_channels = input_channels
self.nz = nz
self.batch_size = batch_size
self.layer_size = layer_size
# Construct encoder module
self.encoder = Encoder(input_channels, layer_size, nz,
batch_size=batch_size, lr=lr, beta1=beta1, beta2=beta2)
# Construct decoder module
self.decoder = Decoder(input_channels, layer_size, nz,
batch_size=batch_size, lr=lr, beta1=beta1, beta2=beta2)
def forward(self, x):
"""
"""
x = x.reshape((self.batch_size, -1))
# Feed forward encoder - decoder
sample_z, mu, logvar = self.encoder.forward(x)
out = self.decoder.forward(sample_z)
return out, mu, logvar
def backward(self, x, out):
"""
"""
grad_dec = self.decoder.backward(x, out)
self.encoder.backward(x, grad_dec)
from .encoder_model import Encoder
和 from .decoder_model import Decoder
引入了自定義的 Encoder
和 Decoder
類別。
eps = 10e-8
:定義了一個非常小的數值,避免除以零或其他數值積分的不穩定情況。
__init__(self, input_channels, layer_size, nz, batch_size, lr, beta1, beta2)
初始化 VAE 參數,包括:
input_channels
)nz
)batch_size
)lr
)在初始化過程中,建立了 Encoder
和 Decoder
的 module,這些模型將用於 VAE 的訓練和生成。
forward(self, x)
是 VAE 的前向傳播函式。
它接受輸入 x
,將其重塑為適當的形狀,然後通過 Encoder
模型將輸入壓縮為隱變數 sample_z
、平均值 mu
和對數方差 logvar
。
接著,它將 sample_z
通過 Decoder
模型解碼,並返回生成的輸出、平均值和對數方差。
backward(self, x, out)
是 VAE 的反向傳播函式。
它接受輸入 x
和前向傳播的輸出 out
,然後使用 Decoder
模型的反向傳播來計算梯度。
接著,它使用這些梯度來更新 Encoder
模型的權重,以便進行訓練。
明天,我們會透過 Kaggle competition 來使用 Variational Autoencoder 解決 Labelled Faces in the Wild (LFW) Dataset 所碰到的議題!!!